package com.cscec8b.ct.analysis.io;

import com.cscec8b.ct.common.util.JDBCUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @Copyright: Shanghai Definesys Company.All rights reserved.
 * @Description:
 * @author: chuhaitao
 * @since: 2019/2/15 19:52
 * @history: 1.2019/2/15 created by chuhaitao
 */
public class MysqlTextOutFormat extends OutputFormat<Text, Text> {


    /**
     * 参考TextOutputFormat需要一个内部类,也可以拿出来写
     */
    protected static class MysqlRecordWriter extends RecordWriter<Text, Text> {
        private Connection connection = null;

        /**
         * 输出数据
         * 1、获取jdbc连接
         * 2、
         *
         * @param key
         * @param values
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void write(Text key, Text values) throws IOException, InterruptedException {
            connection = JDBCUtil.get();
            PreparedStatement pstm = null;
            String insertSQL = "inset into tb_call () valuse ()";
            try {
                pstm = connection.prepareStatement(insertSQL);
                pstm.setInt(1, 1);
                pstm.setInt(2, 1);
                pstm.setInt(3, 1);
                pstm.setInt(4, 1);
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        /**
         * 释放资源
         *
         * @param taskAttemptContext
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {


            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    /**
     * 对reducer的结果 进行输出
     * 1、创建jdbc连接
     * 2、插入数据
     *
     * @param taskAttemptContext
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

        return new MysqlRecordWriter();
    }

    @Override
    public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {

    }


    public static Path getOutputPath(JobContext job) {
        String name = job.getConfiguration().get("mapreduce.output.fileoutputformat.outputdir");
        return name == null ? null : new Path(name);
    }


    private FileOutputCommitter committer = null;

    /**
     * 参考FileOutputFormat
     *
     * @param context
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
        if (this.committer == null) {
            Path output = getOutputPath(context);
            this.committer = new FileOutputCommitter(output, context);
        }

        return this.committer;
    }
}
